package com.offcn.bigdata.streaming.p1

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

/**
  * SparkStreaming和kafka的整合
  *
  * 该程序是kafka的消费者
  * bin/kafka-topics.sh --create --topic hadoop --zookeeper bigdata01:2181/kafka --partitions 3 --replication-factor 1
  * bin/kafka-console-producer.sh --topic hadoop --broker-list bigdata01:9092
  */
object _04StreamingReadFromKafkaOps {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setAppName("_03StreamingReadDataFromHDFSOps")
            .setMaster("local")
        val batchInterval = Seconds(2)
        val ssc = new StreamingContext(conf, batchInterval)

        val topics = Set("hadoop")
        val kafkaParams = Map[String, Object](
            "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
            "value.deserializer" -> classOf[StringDeserializer],
            "bootstrap.servers" -> "bigdata01:9092,bigdata02:9092,bigdata03:9092",
            "group.id" -> "spark-kafka-group-0829-1",
            "auto.offset.reset" -> "earliest",
            "enable.auto.commit" -> "true"
        )

        val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,
            LocationStrategies.PreferConsistent,
            ConsumerStrategies.Subscribe[String, String](
                topics,
                kafkaParams
            )
        )
        kafkaStream.foreachRDD((rdd, bTime) => {
            if(!rdd.isEmpty()) {
                println("-------------------------------------------")
                println(s"Time: $bTime")
                println("-------------------------------------------")
                rdd.foreach(record => println(record))
            }
        })

        ssc.start()
        ssc.awaitTermination()
    }
}
